Linux perf子系统的使用(二)——采样(signal方式)

在上一篇《Linux perf子系统的使用(一)——计数》已经讲解了如何使用perf_event_open、read和ioctl对perf子系统进行编程。但有时我们并不需要计数,而是要采样。比如这么一个需求:统计一个程序中哪些函数最耗时间。嗯,这个功能确实可以通过perf record命令来做,但是perf record内部又是如何做到的呢?自己实现又是怎样的呢?perf record是基于统计学原理的。假设以1000Hz的频率对某个进程采样,每次采样记录下该进程的IP寄存器的值(也就是下一条指令的地址)。通过分析该进程的可执行文件,是可以得知每次采样的IP值处于哪个函数内部。OK,那么我们相当于以1000Hz的频率获知进程当前所执行的函数。如果某个函数f()占用了30%的时间,那么所有采样中,该函数出现的频率也应该将近30%,只要采样数量足够多。这正是perf record的原理。所以,perf的采样模式很有用~

但是,采样比较复杂,主要表现在三点:1、采样需要设置触发源,也就是告诉kernel何时进行一次采样;2、采样需要设置信号,也就是告诉kernnel,采样完成后通知谁;3、采样值的读取需要使用mmap,因为采样有异步性,需要一个环形队列,另外也是出于性能的考虑。

直接上代码吧,对照着官方手册看,学习效率最高:

perf.c

//如果不加,则F_SETSIG未定义
#define _GNU_SOURCE 1

#include <stdio.h>
#include <fcntl.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <linux/perf_event.h>

//环形缓冲区大小,16页,即16*4kB
#define RING_BUFFER_PAGES 16

//目前perf_event_open在glibc中没有封装,需要手工封装一下
int perf_event_open(struct perf_event_attr *attr,pid_t pid,int cpu,int group_fd,unsigned long flags)
{
    return syscall(__NR_perf_event_open,attr,pid,cpu,group_fd,flags);
}

//mmap共享内存的开始地址
void* rbuf;

//环形队列中每一项元素
struct perf_my_sample
{
    struct perf_event_header header;
    uint64_t ip;
};

//下一条采样记录的相对于环形缓冲区开头的偏移量
uint64_t next_offset=0;

//采样完成后的信号处理函数
void sample_handler(int sig_num,siginfo_t *sig_info,void *context)
{
    //计算出最新的采样所在的位置(相对于rbuf的偏移量)
    uint64_t offset=4096+next_offset;
    //指向最新的采样
    struct perf_my_sample* sample=(void*)((uint8_t*)rbuf+offset);
    //过滤一下记录
    if(sample->header.type==PERF_RECORD_SAMPLE)
    {
        //得到IP值
        printf("%lx\n",sample->ip);
    }
    //共享内存开头是一个struct perf_event_mmap_page,提供环形缓冲区的信息
    struct perf_event_mmap_page* rinfo=rbuf;
    //手工wrap一下data_head值,得到下一个记录的偏移量
    next_offset=rinfo->data_head%(RING_BUFFER_PAGES*4096);
}

//模拟的一个负载
void workload()
{
    int i,c=0;
    for(i=0;i<100000000;i++)
    {
        c+=i*i;
        c-=i*100;
        c+=i*i*i/100;
    }
}

int main()
{
    struct perf_event_attr attr;
    memset(&attr,0,sizeof(struct perf_event_attr));
    attr.size=sizeof(struct perf_event_attr);
    //触发源为CPU时钟
    attr.type=PERF_TYPE_SOFTWARE;
    attr.config=PERF_COUNT_SW_CPU_CLOCK;
    //每100000个CPU时钟采样一次
    attr.sample_period=100000;
    //采样目标是IP
    attr.sample_type=PERF_SAMPLE_IP;
    //初始化为禁用
    attr.disabled=1;
    int fd=perf_event_open(&attr,0,-1,-1,0);
    if(fd<0)
    {
        perror("Cannot open perf fd!");
        return 1;
    }
    //创建1+16页共享内存,应用程序只读,读取fd产生的内容
    rbuf=mmap(0,(1+RING_BUFFER_PAGES)*4096,PROT_READ,MAP_SHARED,fd,0);
    if(rbuf<0)
    {
        perror("Cannot mmap!");
        return 1;
    }
    //这三个fcntl为何一定这么设置不明,但必须这样
    fcntl(fd,F_SETFL,O_RDWR|O_NONBLOCK|O_ASYNC);
    fcntl(fd,F_SETSIG,SIGIO);
    fcntl(fd,F_SETOWN,getpid());
    //开始设置采样完成后的信号通知
    struct sigaction sig;
    memset(&sig,0,sizeof(struct sigaction));
    //由sample_handler来处理采样完成事件
    sig.sa_sigaction=sample_handler;
    //要带上siginfo_t参数(因为perf子系统会传入参数,包括fd)
    sig.sa_flags=SA_SIGINFO;
    if(sigaction(SIGIO,&sig,0)<0)
    {
        perror("Cannot sigaction");
        return 1;
    }
    //开始监测
    ioctl(fd,PERF_EVENT_IOC_RESET,0);
    ioctl(fd,PERF_EVENT_IOC_ENABLE,0);
    workload();
    //停止监测
    ioctl(fd,PERF_EVENT_IOC_DISABLE,0);
    munmap(rbuf,(1+RING_BUFFER_PAGES)*4096);
    close(fd);
    return 0;
}

可以看到一下子比计数模式复杂多了。采样模式是要基于计数模式的——选择一个“参考计数器”,并设置一个阈值,每当这个“参考计数器”达到阈值时,触发一次采样。每次采样,kernel会把值放入队列的末尾。如何得知kernenl完成了一次最新的采样了呢?一种方法就是定时轮询,另一种就是响应信号。

如何读取mmap共享内存中的值呢?首先,共享内存开头是一个struct perf_event_mmap_page,提供环形缓冲区的信息,对我们最重要的字段就是data_head,官方手册的介绍是这样的:

注意,data_head一直递增,不回滚!!所以需要手动处理wrap。另外一个需要注意的地方是,每次事件响应中,得到的data_head是下一次采样的队列头部,所以需要自己保存一个副本next_offset,以供下次使用。

这个struct perf_event_mmap_page独占共享内存的第一页。后面必须跟2^n页,n自己决定。这2^n页用来存放采样记录。每一条记录的结构体如下:

因为我只选择了采样IP,即PERF_SAMPLE_IP,所以这个结构体就退化为了:

struct perf_my_sample
{
    struct perf_event_header header;
    uint64_t ip;
};

另外一个需要注意的地方是mmap中的第三个参数,是PROT_READ,表示应用程序只读。如果设置为了PROT_READ|PROT_WRITE,那么读取的过程就不一样了:

这样相当于和kernel做一个同步操作,效率务必下降。而且由于SIGIO这个信号是不可靠信号,所以如果某次采样完成的通知没有被截获,那么就可能产生死锁。

gcc perf.c -o perf
sudo ./perf

运行上面的代码,产生如下输出:

为了验证采集到的IP值是否正确,可以反汇编一下:

objdump -d ./perf

可以看到采集到的IP值全部落在workload这个函数的地址范围内。

要采多个值的话,也很方便:

//如果不加,则F_SETSIG未定义
#define _GNU_SOURCE 1

#include <stdio.h>
#include <fcntl.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <linux/perf_event.h>

//环形缓冲区大小,16页,即16*4kB
#define RING_BUFFER_PAGES 2

//目前perf_event_open在glibc中没有封装,需要手工封装一下
int perf_event_open(struct perf_event_attr *attr,pid_t pid,int cpu,int group_fd,unsigned long flags)
{
    return syscall(__NR_perf_event_open,attr,pid,cpu,group_fd,flags);
}

//mmap共享内存的开始地址
void* rbuf;

//环形队列中每一项元素
struct perf_my_sample
{
    struct perf_event_header header;
    uint64_t ip;
    uint64_t nr;
    uint64_t ips[0];
};

//下一条采样记录的相对于环形缓冲区开头的偏移量
uint64_t next_offset=0;

//采样完成后的信号处理函数
void sample_handler(int sig_num,siginfo_t *sig_info,void *context)
{
    //计算出最新的采样所在的位置(相对于rbuf的偏移量)
    uint64_t offset=4096+next_offset;
    //指向最新的采样
    struct perf_my_sample* sample=(void*)((uint8_t*)rbuf+offset);
    //过滤一下记录
    if(sample->header.type==PERF_RECORD_SAMPLE)
    {
        //得到IP值
        printf("IP: %lx\n",sample->ip);
        if(sample->nr<1024)
        {
            //得到调用链长度
            printf("Call Depth: %lu\n",sample->nr);
            //遍历调用链
            int i;
            for(i=0;i<sample->nr;i++)
                printf("  %lx\n",sample->ips[i]);
    	}
    }
    //共享内存开头是一个struct perf_event_mmap_page,提供环形缓冲区的信息
    struct perf_event_mmap_page* rinfo=rbuf;
    //手工wrap一下data_head值,得到下一个记录的偏移量
    next_offset=rinfo->data_head%(RING_BUFFER_PAGES*4096);
}

//模拟的一个负载
void workload()
{
    int i,c=0;
    for(i=0;i<1000000000;i++)
    {
        c+=i*i;
        c-=i*100;
        c+=i*i*i/100;
    }
}

int main()
{
    struct perf_event_attr attr;
    memset(&attr,0,sizeof(struct perf_event_attr));
    attr.size=sizeof(struct perf_event_attr);
    //触发源为CPU时钟
    attr.type=PERF_TYPE_SOFTWARE;
    attr.config=PERF_COUNT_SW_CPU_CLOCK;
    //每100000个CPU时钟采样一次
    attr.sample_period=100000;
    //采样目标是IP
    attr.sample_type=PERF_SAMPLE_IP|PERF_SAMPLE_CALLCHAIN;
    //初始化为禁用
    attr.disabled=1;
    int fd=perf_event_open(&attr,0,-1,-1,0);
    if(fd<0)
    {
        perror("Cannot open perf fd!");
        return 1;
    }
    //创建1+16页共享内存,应用程序只读,读取fd产生的内容
    rbuf=mmap(0,(1+RING_BUFFER_PAGES)*4096,PROT_READ,MAP_SHARED,fd,0);
    if(rbuf<0)
    {
        perror("Cannot mmap!");
        return 1;
    }
    //这三个fcntl为何一定这么设置不明,但必须这样
    fcntl(fd,F_SETFL,O_RDWR|O_NONBLOCK|O_ASYNC);
    fcntl(fd,F_SETSIG,SIGIO);
    fcntl(fd,F_SETOWN,getpid());
    //开始设置采样完成后的信号通知
    struct sigaction sig;
    memset(&sig,0,sizeof(struct sigaction));
    //由sample_handler来处理采样完成事件
    sig.sa_sigaction=sample_handler;
    //要带上siginfo_t参数(因为perf子系统会传入参数,包括fd)
    sig.sa_flags=SA_SIGINFO;
    if(sigaction(SIGIO,&sig,0)<0)
    {
        perror("Cannot sigaction");
        return 1;
    }
    //开始监测
    ioctl(fd,PERF_EVENT_IOC_RESET,0);
    ioctl(fd,PERF_EVENT_IOC_ENABLE,0);
    workload();
    //停止监测
    ioctl(fd,PERF_EVENT_IOC_DISABLE,0);
    munmap(rbuf,(1+RING_BUFFER_PAGES)*4096);
    close(fd);
    return 0;
}